#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "lich_qos.h"

#include "volume_ctl.h"
#include "volume_proto.h"
#if 0
#include "rmsnap_bh.h"
#include "bh_task.h"
#include "lich_md.h"
#include "core.h"
#include "squeue.h"
#include "../chunk/chunk_proto.h"
#include "../replica/replica.h"
#include "../../storage/controller/stor_ctl.h"
#include "../../storage/storage/stor_rpc.h"
#include "../../storage/storage/vnode.h"
#include "../../storage/task/recovery.h"
#include "cache.h"
#include "md_proto.h"
#include "variable.h"
#include "ylog.h"
#include "dbg.h"
#include "timer.h"
#include "coroutine.h"
#endif
#include "volume_ctl_internal.h"
#include "recovery.h"


typedef struct {
        chkinfo_t chkinfo;
        reploc_t __pad__[LICH_REPLICA_MAX];
        char pool[MAX_NAME_LEN];
        poolid_t parent;
        struct list_head list;
} arg_t;

typedef struct {
        struct list_head hook;
        chkid_t chkid;
} recycle_entry_t;

typedef struct {
        worker_handler_t handler;
        struct list_head list;
} recycle_worker_t;

typedef struct {
        struct list_head hook;
        chkid_t chkid;
} volume_analysis_entry_t;

typedef struct {
        worker_handler_t handler;
        struct list_head list;
} analysis_worker_t;

typedef struct {
        volid_t volid;
        size_t size;
        off_t offset;
        buffer_t *buf;

        chkid_t chkid;
        int retval;
        nid_t *dist;
        int dist_count;
} core_ctx_t;

static mcache_t *__vc_cache__;
static co_worker_t *worker;
static recycle_worker_t *recycle;

STATIC int __volume_ctl_get(mcache_entry_t **_cent, const chkid_t *chkid);
STATIC int __volume_ctl_get__(mcache_entry_t **_cent, const chkid_t *chkid, const fileid_t *parent);
STATIC int __volume_ctl_try_load(const chkid_t *chkid, const fileid_t *_parent);
static void __volume_ctl_lease_worker_create(volume_proto_t *volume_proto);

static int __cmp(const void *s1, const void *s2)
{
        const chkid_t *chkid = s1;
        const chkid_t *ent = s2;

        return !chkid_cmp(chkid, ent);
}

static uint32_t __hash(const void *key)
{
        const chkid_t *id = key;

        return id->id;
}

static uint32_t __core_hash(const void *key)
{
        const chkid_t *id = key;

        return core_hash(id);
}

static void __entry_free(void *ent)
{
        DINFO("drop volume %p\n", ent);
        volume_proto_destroy(ent);
}

static int __drop(void *value, mcache_entry_t *cent, int recycle)
{
        (void) cent;
        if (value) {
                if (recycle) {
                        volume_proto_t *volume_proto = value;
                        DINFO("recycle "CHKID_FORMAT"\n", &volume_proto->chkid);
                }
                __entry_free(value);
        }

        return 0;
}

STATIC void __volume_ctl_unlock(mcache_entry_t *cent)
{
        mcache_unlock(cent);
}

STATIC int __volume_ctl_lookup_srv(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        poolid_t poolid;
        const volid_t *volid = va_arg(ap, volid_t *);
        const poolid_t *parent = va_arg(ap, poolid_t *);

        va_end(ap);

#if 1
        char pool[MAX_NAME_LEN];
        poolid_t _parent;

        ret = replica_srv_getparent(volid, &_parent, pool);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = md_proto_migrate(pool, &_parent, volid, NULL);
        if (unlikely(ret)) {
                if (ret == EPERM)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }
#endif

        ret = __volume_ctl_get__(&cent, volid, parent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = replica_srv_getparent(volid, &poolid, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = EREMCHG;
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

#if 1
        lease_t lease;
        ret = lease_create(&lease, volid);
        if (unlikely(ret))
                GOTO(err_release, ret);
        
        ret = lease_set(&lease);
        if (unlikely(ret))
                GOTO(err_release, ret);
#else
        
        ret = __volume_ctl_rdlock(cent);//check lease
        if (unlikely(ret))
                GOTO(err_release, ret);
        
        UNIMPLEMENTED(__NULL__);

        __volume_ctl_unlock(cent);
#endif
        
        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_lookup_srv(const volid_t *volid, const poolid_t *parent)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_lookup", __volume_ctl_lookup_srv, volid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_wrlock(mcache_entry_t *cent)
{
        int ret, retry = 0;
        volume_proto_t *volume_proto;
        chkid_t chkid;

retry:
        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        if (volume_proto->needreload(volume_proto)) {
                YASSERT(retry < gloconf.rpc_timeout / 2);

                chkid = volume_proto->chkid;
                __volume_ctl_unlock(cent);

                DWARN("chunk "CHKID_FORMAT" need reload, retry %u\n", CHKID_ARG(&chkid), retry);

                ret = __volume_ctl_try_load(&chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("chunk "CHKID_FORMAT" loaded, retry %u\n", CHKID_ARG(&chkid), retry);
                retry++;
                goto retry;
        } else {
                ret = lease_set(&volume_proto->lease);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        mcache_drop_nolock(cent);
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_wrlock_prio(mcache_entry_t *cent)
{
        int ret, retry = 0;
        volume_proto_t *volume_proto;
        chkid_t chkid;

retry:
        ret = mcache_wrlock_prio(cent, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        if (volume_proto->needreload(volume_proto)) {
                YASSERT(retry < 3);
                chkid = volume_proto->chkid;
                __volume_ctl_unlock(cent);

                DWARN("chunk "CHKID_FORMAT" need reload\n", CHKID_ARG(&chkid));
                ret = __volume_ctl_try_load(&chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DINFO("chunk "CHKID_FORMAT" loaded, %u\n", CHKID_ARG(&chkid), retry);
                retry++;
                goto retry;
        } else {
                ret = lease_set(&volume_proto->lease);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        mcache_drop_nolock(cent);
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int IO_FUNC __volume_ctl_rdlock(mcache_entry_t *cent)
{
        int ret, retry = 0;
        volume_proto_t *volume_proto;
        chkid_t chkid;

retry:
        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO core: volume_proto无效内容
        volume_proto = cent->value;

        YASSERT(volume_proto->needreload);
        if (unlikely(volume_proto->needreload(volume_proto))) {
                YASSERT(retry < 3);
                chkid = volume_proto->chkid;
                __volume_ctl_unlock(cent);

                DWARN("chunk "CHKID_FORMAT" need reload\n", CHKID_ARG(&chkid));
                ret = __volume_ctl_try_load(&chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("chunk "CHKID_FORMAT" loaded retry %u\n", CHKID_ARG(&chkid), retry);
                retry++;
                goto retry;
        } else {
                ret = lease_set(&volume_proto->lease);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
        mcache_drop(cent);
err_ret:
        return ret;
}

STATIC void __volume_ctl_drop(mcache_entry_t *cent)
{
        int ret;

        //这里返回失败 代表已经处于drop状态 不需要重复调用
        ret = mcache_wrlock_prio(cent, 1);
        if (unlikely(ret)) {
                DWARN("volume drop failed, ret %d\n", ret);
                return;
        }

        DWARN("volume drop\n");
        mcache_drop_nolock(cent);

        __volume_ctl_unlock(cent);
}

STATIC int __volume_ctl_check_reload(const char *pool, mcache_entry_t *cent,
                                     const poolid_t *parent, const volid_t *volid)
{
        int ret;
        volume_proto_t *volume_proto, *new;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        if (volume_proto->needreload(volume_proto) == 0) {
                __volume_ctl_unlock(cent);
                DINFO(""CHKID_FORMAT" loaded\n", CHKID_ARG(volid));
                goto out;
        }

        __volume_ctl_unlock(cent);

        /* we need protect the whole load process, not allow modify chunkinfo in loading */
        ret = mcache_wrlock_prio(cent, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("reload "CHKID_FORMAT" begin\n", CHKID_ARG(volid));

        ret = md_chunk_getinfo1(pool, parent, volid, chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        ret = volume_proto_load(&new, pool, parent, chkinfo);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        volume_proto = cent->value;
        if (gettime() - volume_proto->uptime < 2) {
                DWARN("reload "CHKID_FORMAT" too fast\n", CHKID_ARG(volid));
        }

        DINFO("volume proto loaded "CHKID_FORMAT" %p old %p\n", CHKID_ARG(volid), new, volume_proto);
        YASSERT(volume_proto->needreload(volume_proto));
        volume_proto_destroy(volume_proto);
        cent->value = new;
        YASSERT(new->ltime);

        __volume_ctl_lease_worker_create(new);

        __volume_ctl_unlock(cent);

out:
        return 0;
err_lock:
        mcache_drop_nolock(cent);
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

void __volume_ctl_release(mcache_entry_t *cent)
{
        // 会调用volume_proto_destroy
        mcache_release(cent);

#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, -1);
#endif
}

void __volume_ctl_deref(mcache_entry_t *cent)
{
        // 会调用volume_proto_destroy
        mcache_release(cent);

#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, -1);
#endif
}

STATIC int __volume_ctl_ref(mcache_entry_t *cent)
{
        int ret;

        ret  = mcache_ref(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, 1);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_get____(mcache_entry_t **_cent, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;

        ret = mcache_get(__vc_cache__, chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_cent = cent;
#if ENABLE_SCHEDULE_LOCK_CHECK
        schedule_lock_set(0, 1);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int IO_FUNC __volume_ctl_get__(mcache_entry_t **_cent, const chkid_t *chkid, const fileid_t *parent)
{
        int ret, retry = 0;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        YASSERT(chkid->id);

        ANALYSIS_BEGIN(0);
retry:
        ret = __volume_ctl_get____(&cent, chkid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DBUG("chunk "CHKID_FORMAT" need reload\n", CHKID_ARG(chkid));
                        ret = __volume_ctl_try_load(chkid, parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        DINFO("chunk "CHKID_FORMAT" loaded, %u\n", CHKID_ARG(chkid), retry);
                        retry++;
                        goto retry;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);

        volume_proto = cent->value;
        YASSERT(chkid_cmp(&volume_proto->chkid, chkid) == 0);
        *_cent = cent;

        return 0;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);
        return ret;
}

STATIC int IO_FUNC __volume_ctl_get(mcache_entry_t **_cent, const chkid_t *chkid)
{
        return __volume_ctl_get__(_cent, chkid, NULL);
}


// -- API

STATIC int __volume_ctl_chunk_getinfo(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret)) {
                ret = (ret == ENOENT) ? EREMCHG : ret;
                GOTO(err_ret, ret);
        }

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_getinfo(volume_proto, chkid, chkinfo);
        if (unlikely(ret)) {
                if (chkid->type != __VOLUME_CHUNK__) {
                        ret = (ret == ENOENT) ? ENOKEY : ret;
                }
                GOTO(err_lock, ret);
        }

        YASSERT(chkid_cmp(chkid, &chkinfo->id) == 0);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_getinfo(const volid_t *volid, const chkid_t *chkid, chkinfo_t *chkinfo)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_chunk_getinfo", __volume_ctl_chunk_getinfo,
                           volid, chkid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_getpool(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        char *pool = va_arg(ap, char *);

        va_end(ap);

        DBUG("chunk "CHKID_FORMAT" %p\n", CHKID_ARG(volid), volid);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        strcpy(pool, volume_proto->table1.pool);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_getpool(const volid_t *volid, char *pool)
{
        int ret;

        YASSERT(volid->type == __VOLUME_CHUNK__);
        DBUG("chunk %s %p\n", id2str(volid), volid);
        ret = core_request(core_hash(volid), -1, "getpool", __volume_ctl_getpool, volid, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_getattr(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        fileinfo_t *fileinfo = va_arg(ap, fileinfo_t *);

        va_end(ap);

        DBUG("chunk "CHKID_FORMAT" %p\n", CHKID_ARG(volid), volid);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->getattr(volume_proto, fileinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_getattr(const volid_t *volid, fileinfo_t *fileinfo)
{
        int ret;

        YASSERT(volid->type == __VOLUME_CHUNK__);
        DBUG("chunk %s %p\n", id2str(volid), volid);
        ret = core_request(core_hash(volid), -1, "getattr", __volume_ctl_getattr, volid, fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        char *pool;
        uint64_t size;
} snap_resize_arg_t;

static int __snap_resize(void *_arg, void *chkid, void *_snap)
{
        int ret;
        fileid_t *fileid = chkid;
        snap_t *snap = _snap;
        snap_resize_arg_t *arg = _arg;

        DINFO("resize %s/"CHKID_FORMAT" snap %s ("CHKID_FORMAT") size %ju\n",
              arg->pool, CHKID_ARG(fileid), snap->key, CHKID_ARG(&snap->chkinfo->id), arg->size);

#if 0
        ret = stor_lookup(arg->pool, fileid, snap->key, &snapid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        setattr_t setattr;
        memset(&setattr, 0x0, sizeof(setattr_t));
        setattr.size.set_it = 1;
        setattr.size.size = arg->size;

        ret = md_setattr(&snap->chkinfo->id, NULL, &setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_resize(volume_proto_t *volume_proto, uint64_t size)
{
        int ret;

        snap_resize_arg_t arg = {
                .pool = volume_proto->table1.pool,
                .size = size
        };

        ret = volume_proto->table1.snapshot_iterator2(&volume_proto->table1, __snap_resize, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_setattr(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        fileinfo_t *fileinfo = va_arg(ap, fileinfo_t *);
        const setattr_t *setattr = va_arg(ap, setattr_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        if (setattr->size.set_it && !(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                // 先resize卷的所有快照, 然后再resize卷本身
                ret = __volume_ctl_snapshot_resize(volume_proto, setattr->size.size);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        }

        ret = volume_proto->setattr(volume_proto, fileinfo, setattr);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (setattr->size.set_it) {
                /**
                 * 如果改变了卷大小，重新加载
                 *
                 * @todo 本过程是否需要wrlock?
                 */
                DWARN("resize %s to %ju\n", id2str(&volume_proto->chkid), setattr->size.size);
                volume_proto->ltime = 0;
        }

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_setattr(const volid_t *volid, fileinfo_t *fileinfo, const setattr_t *setattr)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_setattr", __volume_ctl_setattr,
                           volid, fileinfo, setattr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_allocate(va_list ap)
{
        int ret, deleting;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        int chknum = va_arg(ap, int);
        int fill = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        ret = volume_proto->chunk_allocate(volume_proto, chkid, chknum, fill);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_allocate(const volid_t *volid,
                const chkid_t *chkid, int chknum, int fill)
{
        int ret;

        // TODO core: yield timeout
        ret = core_request(core_hash(volid), -1, "chunk_allocate",
                        __volume_ctl_chunk_allocate, volid,
                        chkid, chknum, fill);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_set(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        int status = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_set(volume_proto, chkid, nid, status);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

/**
 * 设置副本状态，在拔盘的时候，短时期内会大量触发本过程
 *
 * @see disk_sqlite3.c: __disk_lost__
 *
 * @param volid
 * @param chkid
 * @param nid
 * @param status __S_CLEAN | __S_CHECK | __S_DIRTY | __S_OFFLINE
 *
 * @return
 */
int volume_ctl_chunk_set(const volid_t *volid, const chkid_t *chkid,
                         const nid_t *nid, int status)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = core_request(core_hash(chkid), -1, "chunk_set", __volume_ctl_chunk_set,
                           volid, chkid, nid, status);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_ctl_chunk_set");

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_vfm_set(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const vfm_t *vfm = va_arg(ap, vfm_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->vfm_set_dangerously(volume_proto, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_vfm_set(const volid_t *volid, const chkid_t *chkid,
                       const vfm_t *vfm)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = core_request(core_hash(volid), -1, "vfm_set", __volume_ctl_vfm_set,
                           volid, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_ctl_vfm_set");

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_vfm_get(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        vfm_t *vfm = va_arg(ap, vfm_t *);

        ANALYSIS_BEGIN(0);
        
        va_end(ap);

        DBUG("vfm get "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->vfm_get(volume_proto, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_ctl_vfm_get");
        
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_vfm_get(const volid_t *volid, const chkid_t *chkid,
                       vfm_t *vfm)
{
        int ret;

        DBUG("vfm get "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        ret = core_request(core_hash(volid), -1, "vfm_get", __volume_ctl_vfm_get,
                           volid, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_vfm_stat(va_list ap)
{
        int ret;
        const volid_t *volid = va_arg(ap, volid_t *);
        int *count = va_arg(ap, int *);

        va_end(ap);

        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);

        DBUG("vol "CHKID_FORMAT"\n", CHKID_ARG(volid));

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->vfm_stat(volume_proto, count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_ctl_vfm_stat");

        DBUG("vol "CHKID_FORMAT" vfm %d\n", CHKID_ARG(volid), *count);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_vfm_stat(const volid_t *volid, int *count)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vfm_get", __volume_ctl_vfm_stat, volid, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_check_parent(const chkid_t *chkid,
                                     const fileid_t *_parent, char *pool)
{
        int ret, retry = 0;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        poolid_t parent;

        YASSERT(chkid->type == __VOLUME_CHUNK__);

        DBUG("try load volume %s\n", id2str(chkid));

retry:
        ret = replica_srv_getparent(chkid, &parent, pool);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool, &parent, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DWARN("check "CHKID_FORMAT" parent "CHKID_FORMAT", ret: %d\n",
                      CHKID_ARG(chkid), CHKID_ARG(&parent), ret);

                if (ret == ENOENT && _parent && chkid_cmp(&parent, _parent)) {
                        YASSERT(retry == 0);

                        DWARN("try load "CHKID_FORMAT" from "CHKID_FORMAT"\n",
                              CHKID_ARG(chkid), CHKID_ARG(_parent));

                        ret = md_chunk_getinfo1(pool, _parent, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = replica_srv_setparent(chkid, _parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry++;
                        goto retry;
                }

                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_migrate(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const poolid_t *poolid = va_arg(ap, poolid_t *);
        const volid_t *volid = va_arg(ap, volid_t *);
        uint32_t force = va_arg(ap, uint32_t);
        chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);

        va_end(ap);

        char pool[MAX_NAME_LEN];
        table_proto_t *table_proto;
        table1_t *table1;

        (void) force;

        ret = __volume_ctl_check_parent(volid, poolid, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = md_proto_migrate(pool, poolid, volid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        table1 = &volume_proto->table1;
        table_proto = table1->table_proto;
        CHKINFO_CP(chkinfo, table_proto->chkinfo);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_migrate(const poolid_t *poolid, const volid_t *volid, uint32_t force,
                             chkinfo_t *chkinfo)
{
        int ret;

        YASSERT(volid->type == __VOLUME_CHUNK__);
        ret = core_request(core_hash(volid), -1, "chunk_migrate", __volume_ctl_chunk_migrate,
                           poolid, volid, force, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_reject(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        ret = volume_proto->chunk_reject(volume_proto, chkid, nid, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_reject(const volid_t *volid, const chkid_t *chkid, const nid_t *nid,
                          chkinfo_t *chkinfo)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_reject", __volume_ctl_chunk_reject,
                           volid, chkid, nid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_update(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);
        const nid_t *owner = va_arg(ap, nid_t *);
        uint64_t info_version = va_arg(ap, uint64_t);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_update(volume_proto, chkinfo, owner, info_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_update(const volid_t *volid, const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_update", __volume_ctl_chunk_update,
                           volid, chkinfo, owner, info_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_post_write(mcache_entry_t *cent, uint32_t size, uint64_t offset)
{
        int ret;
        volume_proto_t *volume_proto;

        YASSERT(cent->value);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;

        ret = volume_proto_truncate(volume_proto, offset, size);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

int volume_ctl_write(const io_t *io, const buffer_t *buf, int record)
{
        int ret, latency_record;
        time_t begin;
        mcache_entry_t *cent;

        DBUG("write "CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(&io->id), (LLU)io->offset, io->size);

        ANALYSIS_BEGIN(0);
        
        ret = __volume_ctl_get(&cent, &io->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        if (likely(record)) {
                ret = volume_proto_analysis_record(cent->value, io->size, __OP_WRITE);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        ret = volume_proto_latency_begin(cent->value, &begin, &latency_record, __OP_WRITE);
        if (ret)
                GOTO(err_lock, ret);

        ret = volume_proto_write(cent->value, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        ret = __volume_ctl_post_write(cent, io->size, io->offset);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_latency_record(cent->value, begin, latency_record, __OP_WRITE);
        if (unlikely(ret)) {
                //GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        ANALYSIS_END(0, IO_WARN, NULL);
        
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

#if 0
STATIC void __volume_ctl_write_data(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;
        io_t io;

        io_init(&io, &core_ctx->volid, NULL, core_ctx->offset, core_ctx->size, 0);

        ret = volume_ctl_write(&io, core_ctx->buf, 0);
        if (ret)
                GOTO(err_ret, ret);

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

int volume_ctl_write_data(const volid_t *volid, const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        core_ctx_t core_ctx;

        core_ctx.volid = *volid;
        core_ctx.size = size;
        core_ctx.offset = offset;
        core_ctx.buf = (buffer_t *)buf;
        core_ctx.retval = 0;

        ret = core_request0(core_hash(volid), __volume_ctl_write_data,
                        &core_ctx, "write_data");
        if (ret)
        GOTO(err_ret, ret);

        ret = core_ctx.retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;
        err_ret:
        return ret;
}
#endif

int volume_ctl_unmap(const io_t *io, int record)
{
        int ret;
        mcache_entry_t *cent;

        (void) record;

        ret = __volume_ctl_get(&cent, &io->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_unmap(cent->value, io);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int __volume_ctl_table_write__(const volid_t *volid, const chkid_t *chkid,
                const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_table_write(cent->value, chkid, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        volume_proto = cent->value;
        volume_proto->ltime = 0;
        DINFO(""CHKID_FORMAT" reset\n", CHKID_ARG(volid));

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC void __volume_ctl_table_write(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;

        ret = __volume_ctl_table_write__(&core_ctx->volid, &core_ctx->chkid,
                        core_ctx->buf, core_ctx->size, core_ctx->offset);
        if (ret)
                GOTO(err_ret, ret);

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

int volume_ctl_table_write(const volid_t *volid, const chkid_t *chkid,
                const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        core_ctx_t core_ctx;

        core_ctx.volid = *volid;
        core_ctx.chkid = *chkid;
        core_ctx.size = size;
        core_ctx.offset = offset;
        core_ctx.buf = (buffer_t *)buf;
        core_ctx.retval = 0;

        ret = core_request0(core_hash(volid), __volume_ctl_table_write,
                           &core_ctx, "table_write");
        if (ret)
                GOTO(err_ret, ret);

        ret = core_ctx.retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_newchunk(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const buffer_t *buf = va_arg(ap, buffer_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_newchunk(cent->value, chkid, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_newchunk(const volid_t *volid, const chkid_t *chkid, const buffer_t *buf)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "newchunk", __volume_ctl_newchunk,
                           volid, chkid, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_exist(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        int *exist = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_chunk_exist(cent->value, chkid, exist);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_exist(const volid_t *volid, const chkid_t *chkid, int *exist)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_exist", __volume_ctl_chunk_exist,
                           volid, chkid, exist);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_discard(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_discard(cent->value, chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_discard(const volid_t *volid, const chkid_t *chkid)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = core_request(core_hash(volid), -1, "discard", __volume_ctl_discard,
                           volid, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

int volume_ctl_write_direct(mcache_entry_t *cent, const io_t *io, const buffer_t *buf, int record)
{
        int ret, latency_record;
        time_t begin;

        DBUG("write "CHKID_FORMAT" cent %p\n", CHKID_ARG(&io->id), cent);

        YASSERT(cent->ref);
        ret = __volume_ctl_ref(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        if (likely(record)) {
                ret = volume_proto_analysis_record(cent->value, io->size, __OP_WRITE);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        ret = volume_proto_latency_begin(cent->value, &begin, &latency_record, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = volume_proto_write(cent->value, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        ret = __volume_ctl_post_write(cent, io->size, io->offset);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_latency_record(cent->value, begin, latency_record, __OP_WRITE);
        if (unlikely(ret)) {
                //GOTO(err_release, ret);
        }

        __volume_ctl_deref(cent);

        //YASSERT(cent->ref);
        return 0;
err_lock:
        if (ret == EREMCHG) {
                DWARN("drop "CHKID_FORMAT" %p\n", CHKID_ARG(&io->id), cent->value);
                mcache_drop_nolock(cent);
        }

        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_deref(cent);
err_ret:
        //YASSERT(cent->ref);
        return ret;
}

int IO_FUNC volume_ctl_read(const io_t *io, buffer_t *buf, int record)
{
        int ret, latency_record;
        time_t begin;
        mcache_entry_t *cent;

        ANALYSIS_BEGIN(0);

        DBUG("read "CHKID_FORMAT" offset %llu size %u\n", CHKID_ARG(&io->id), (LLU)io->offset, io->size);

        ret = __volume_ctl_get(&cent, &io->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        if (likely(record)) {
                ret = volume_proto_analysis_record(cent->value, io->size, __OP_READ);
                if (unlikely(ret)) {
                        //GOTO(err_lock, ret);
                }
        }

        ret = volume_proto_latency_begin(cent->value, &begin, &latency_record, __OP_READ);
        if (ret)
                GOTO(err_lock, ret);

        ret = volume_proto_read(cent->value, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        ret = volume_proto_latency_record(cent->value, begin, latency_record, __OP_READ);
        if (unlikely(ret)) {
                //GOTO(err_release, ret);
        }
        __volume_ctl_release(cent);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

STATIC int __volume_ctl_table_read__(const volid_t *volid, const chkid_t *chkid,
                buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        mcache_entry_t *cent;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_table_read(cent->value, chkid, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC void __volume_ctl_table_read(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;

        ret = __volume_ctl_table_read__(&core_ctx->volid, &core_ctx->chkid,
                        core_ctx->buf, core_ctx->size, core_ctx->offset);
        if (ret)
                GOTO(err_ret, ret);

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

int volume_ctl_table_read(const volid_t *volid, const chkid_t *chkid,
                buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        core_ctx_t core_ctx;

        core_ctx.volid = *volid;
        core_ctx.chkid = *chkid;
        core_ctx.size = size;
        core_ctx.offset = offset;
        core_ctx.buf = buf;
        core_ctx.retval = 0;

        ret = core_request0(core_hash(volid), __volume_ctl_table_read,
                           &core_ctx, "table_read");
        if (ret)
                GOTO(err_ret, ret);

        ret = core_ctx.retval;
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_ctl_read_direct(mcache_entry_t *cent, const io_t *io, buffer_t *buf, int record)
{
        int ret, latency_record;
        time_t begin;

        YASSERT(cent->ref);
        ret = __volume_ctl_ref(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);


        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        if (likely(record)) {
                ret = volume_proto_analysis_record(cent->value, io->size, __OP_READ);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        ret = volume_proto_latency_begin(cent->value, &begin, &latency_record, __OP_READ);
        if (ret)
                GOTO(err_lock, ret);

        ret = volume_proto_read(cent->value, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        ret = volume_proto_latency_record(cent->value, begin, latency_record, __OP_READ);
        if (unlikely(ret)) {
                //GOTO(err_release, ret);
        }

        __volume_ctl_deref(cent);

        //YASSERT(cent->ref);
        return 0;
err_lock:
        if (ret == EREMCHG) {
                DWARN("drop "CHKID_FORMAT" %p\n", CHKID_ARG(&io->id), cent->value);
                mcache_drop_nolock(cent);
        }

        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_deref(cent);
err_ret:
        //YASSERT(cent->ref);
        return ret;
}

int volume_ctl_chunk_check(const volid_t *volid, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_check(volume_proto, chkid, NULL);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_diskcheck(const char *pool, const volid_t *volid, const chkid_t *chkid)
{
        int ret, online;

        ret = replica_srv_diskonline(chkid, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        //YASSERT(online == 0);

        ret = md_chunk_reject(pool, volid, chkid, net_getnid(), NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        chkid_t chkid;
        uint32_t magic;
        ytime_t time;
} lease_check_t;

STATIC int __volume_ctl_lease(lease_check_t *lease_check, uint64_t i)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, &lease_check->chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        if (volume_proto->needreload(volume_proto)) {
                ret = ESTALE;
                DINFO("renew "CHKID_FORMAT" exit, need reload\n", CHKID_ARG(&lease_check->chkid));
                GOTO(err_lock, ret);
       }

#if 1
        table1_t *table1;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        table1 = &volume_proto->table1;
        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(table1->pool, &table1->parent,
                                &volume_proto->chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DINFO("renew "CHKID_FORMAT" exit, getinfo fail ret %d\n", CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }

        /*
         * 此过程并不能保证当前节点仍在续之中,
         * 其他节点可能已经获取并且通过migrate修改了副本分布
         */
        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                DINFO("renew "CHKID_FORMAT" exit, %d \n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }

        //YASSERT(chkinfo->info_version == table1->table_proto->chkinfo->info_version);
        if (chkinfo->info_version != table1->table_proto->chkinfo->info_version) {
                ret = EPERM;
                CHKINFO_DUMP(chkinfo, D_INFO);
                CHKINFO_DUMP(table1->table_proto->chkinfo, D_INFO);
                DINFO("renew "CHKID_FORMAT" exit, %d \n",
                      CHKID_ARG(&lease_check->chkid), ret);
                GOTO(err_lock, ret);
        }
#endif

        if (volume_proto->magic != lease_check->magic) {
                ret = ESTALE;
                DINFO("renew "CHKID_FORMAT" exit, reloaded \n", CHKID_ARG(&lease_check->chkid));
                GOTO(err_lock, ret);
        }

        ret = volume_proto_renew(cent->value);
        if (unlikely(ret)) {
                mcache_drop_nolock(cent);
                DINFO("renew "CHKID_FORMAT" fail \n", CHKID_ARG(&lease_check->chkid));
                GOTO(err_lock, ret);
        }

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        if (i % 100  == 0) {
                ytime_t now = ytime_gettime();
                DINFO("renew "CHKID_FORMAT" success %ju time %ju seconds\n",
                      CHKID_ARG(&lease_check->chkid), i, (now - lease_check->time) / USECONDS_PER_SEC);
                lease_check->time = now;
        }

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC void __volume_ctl_lease_worker(void *arg)
{
        int ret;
        lease_check_t *lease_check = arg;
        uint64_t i = 0;

        DINFO("start "CHKID_FORMAT" lease worker, magic %x\n",
                        CHKID_ARG(&lease_check->chkid), lease_check->magic);
        // schedule不检查一个task总执行时间,而是检查处在suspend状态的wait时间
        while (1) {
                schedule_sleep("volume_lease", 1000 * 1000 * gloconf.lease_timeout / 2);

                ret = __volume_ctl_lease(lease_check, i);
                if (ret) {
                        DINFO("exit "CHKID_FORMAT" lease worker, magic %x\n",
                              CHKID_ARG(&lease_check->chkid), lease_check->magic);
                        yfree((void **)&lease_check);
                        break;
                }

                i++;
        }
}

STATIC void __volume_ctl_lease_worker_create(volume_proto_t *volume_proto)
{
        int ret;
        lease_check_t *lease_check;

        ret = ymalloc((void **)&lease_check, sizeof(*lease_check));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        volume_proto->magic = fastrandom();
        lease_check->chkid = volume_proto->chkid;
        lease_check->magic = volume_proto->magic;
        lease_check->time = ytime_gettime();

        DINFO("create "CHKID_FORMAT" lease worker, magic %x\n",
              CHKID_ARG(&lease_check->chkid), lease_check->magic);

        // TODO 会一直占据schedule task slot
        schedule_task_new("volume_lease", __volume_ctl_lease_worker, lease_check, -1);
}

STATIC int __volume_ctl_load_bh(const char *pool, const poolid_t *parent, const chkid_t *chkid)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto = NULL;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        YASSERT(chkid->id);

retry:
        ret = __volume_ctl_get____(&cent, chkid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DINFO("load "CHKID_FORMAT" start\n", CHKID_ARG(chkid));
                        ret = md_chunk_getinfo1(pool, parent, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ANALYSIS_BEGIN(0);
                        ret = volume_proto_load(&volume_proto, pool, parent, chkinfo);
                        ANALYSIS_END(0, IO_WARN, "volume_proto_load");
                        if (unlikely(ret)) {
                                if (ret == ENOSPC) {
                                        ret = __volume_ctl_diskcheck(pool, parent, chkid);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);

                                        ret = EREMCHG;
                                        GOTO(err_ret, ret);
                                } else
                                        GOTO(err_ret, ret);
                        }

                        ret = mcache_insert(__vc_cache__, chkid, volume_proto);
                        if (unlikely(ret)) {
                                if (ret == EEXIST) {
                                        volume_proto_destroy(volume_proto);
                                        DWARN("load "CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                                        goto retry;
                                } else
                                        GOTO(err_ret, ret);
                        }

                        __volume_ctl_lease_worker_create(volume_proto);
                } else
                        GOTO(err_ret, ret);
        } else {
                DINFO("chunk "CHKID_FORMAT" reload %p\n", CHKID_ARG(chkid), cent);

                ret = __volume_ctl_check_reload(pool, cent, parent, chkid);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                __volume_ctl_release(cent);
        }

        DINFO("load "CHKID_FORMAT" finish %p\n", CHKID_ARG(chkid), cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        DINFO("load "CHKID_FORMAT" fail\n", CHKID_ARG(chkid));
        return ret;
}

STATIC void  __volume_load_bh_resume(const chkid_t *chkid, arg_t *arg, int retval)
{
        struct list_head *pos, *n;
        co_wait_task_t *wait_arg;

        (void) chkid;

        list_for_each_safe(pos, n, &arg->list) {
                wait_arg = (void *)pos;
                list_del(pos);
                schedule_resume(&wait_arg->task, retval, NULL);
        }
}

STATIC int  __volume_load_bh_cleanup(const chkid_t *chkid, arg_t *_arg, int retval)
{
        int ret;
        arg_t *arg;

        (void) chkid;

        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = squeue_remove(&worker->queue, chkid, (void **)&arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_load_bh_resume(chkid, arg, retval);

        sy_spin_unlock(&worker->lock);

        YASSERT(_arg == arg);

        yfree((void **)&_arg);

        return 0;
err_lock:
        sy_spin_unlock(&worker->lock);
err_ret:
        yfree((void **)&_arg);
        return ret;
}

STATIC int  __volume_load_bh_fail(const chkid_t *chkid, arg_t *arg, int retval)
{
        int ret;

        DWARN("load "CHKID_FORMAT" fail\n", CHKID_ARG(chkid));

        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        __volume_load_bh_resume(chkid, arg, retval);

        sy_spin_unlock(&worker->lock);

        return 0;
err_ret:
        return ret;
}


STATIC void __volume_load_bh(void *_arg)
{
        int ret, retry = 0;
        arg_t *arg;
        time_t now, begin = gettime();

        arg = _arg;

retry:
        ret = __volume_ctl_load_bh(arg->pool, &arg->parent, &arg->chkinfo.id);
        if (ret) {
                now = gettime();
                if (ret == EREMCHG || ret == ENOENT
                    || now - begin > gloconf.lease_timeout) {
                        DWARN("load "CHKID_FORMAT" total %u (%u) %s\n",
                              CHKID_ARG(&arg->chkinfo.id), (int)(now - begin), ret, strerror(ret));
                        GOTO(err_ret, ret);
                } else {
                        __volume_load_bh_fail(&arg->chkinfo.id, arg,
                                                ret == EREMCHG ? ret : ESTALE);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 100, (100 * 1000));
                }
        }

        ret = __volume_load_bh_cleanup(&arg->chkinfo.id, arg, 0);
        YASSERT(ret == 0);

        return;
err_ret:
        if (ret == ENOENT) {
                ret = __volume_load_bh_cleanup(&arg->chkinfo.id, arg,
                                               ret);
        } else {
                ret = __volume_load_bh_cleanup(&arg->chkinfo.id, arg,
                                               ret == EREMCHG ? ret : ESTALE);
        }

        YASSERT(ret == 0);
}

static uint32_t __key_from_int(const void *i)
{
        return ((chkid_t *)i)->id;
}

static int __equal(const void *key, const void *data)
{
        const chkid_t *id = key;
        const squeue_entry_t *sent = data;
        arg_t *arg;

        arg = sent->ent;

        DBUG("chkid "CHKID_FORMAT" --- "CHKID_FORMAT"\n", CHKID_ARG(id),
             CHKID_ARG(&arg->chkinfo.id));

        return !chkid_cmp(id, &arg->chkinfo.id);
}

STATIC int __volume_ctl_loader_init()
{
        int ret;

        ret = ymalloc((void **)&worker, sizeof(*worker));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_spin_init(&worker->lock);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = squeue_init(&worker->queue, 1024, __equal, __key_from_int);
        if (unlikely(ret))
                GOTO(err_free, ret);

#if 0
        ret = worker_create(&worker->sem, "volume_load", __volume_load_worker,
                            NULL, NULL, WORKER_TYPE_SEM, 0);
        if (unlikely(ret))
                GOTO(err_free, ret);
#endif

        return 0;
err_free:
        yfree((void **)&worker);
err_ret:
        return ret;
}

STATIC void __volume_recycle_collect(void *arg, void *ent)
{
        int ret;
        recycle_entry_t *rent;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        (void) arg;
        cent = ent;
        volume_proto = cent->value;

        ret = ymalloc((void **)&rent, sizeof(*rent));
        if (unlikely(ret))
                goto out;

        rent->chkid = volume_proto->chkid;
        list_add(&rent->hook, &recycle->list);

out:
        return;
}

STATIC int __volume_recycle_handle()
{
#if 0
        int ret;
        recycle_entry_t *rent, *tmp;
        chkid_t *chkid;
        mcache_entry_t *cent;

        list_for_each_entry_safe(rent, tmp, &recycle->list, hook) {
                chkid = &rent->chkid;

                ret = __volume_ctl_get(&cent, chkid);
                if (!ret) {
                        __volume_ctl_release(cent);
                }

                list_del(&rent->hook);
                yfree((void **)&rent);
        }
#endif

        return 0;
}

STATIC int __volume_recycle_worker(void *_arg)
{
        int ret;
        (void) _arg;

        mcache_iterator(__vc_cache__, __volume_recycle_collect, NULL);

        ret = __volume_recycle_handle();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&recycle->handler, USEC_PER_DAY);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_recycle_init()
{
        int ret;

#if 0
        DERROR("file recycle service disabled\n");
        return 0;
#endif

        ret = ymalloc((void **)&recycle, sizeof(*recycle));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        INIT_LIST_HEAD(&recycle->list);

        ret = timer1_create(&recycle->handler, "volume_recycle", __volume_recycle_worker, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&recycle->handler, USEC_PER_DAY);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_load_wait(const char *pool, const poolid_t *parent, const chkinfo_t *_chkinfo)
{
        int ret;
        arg_t *arg;
        co_wait_task_t wait_arg;

        ret = sy_spin_lock(&worker->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("require load %s\n", id2str(&_chkinfo->id));

        // 保证load的唯一性，其它并发任务加入等待队列
        ret = squeue_get(&worker->queue, &_chkinfo->id, (void **)&arg);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        ret = ymalloc((void**)&arg, sizeof(*arg));
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        strcpy(arg->pool, pool);
                        arg->parent = *parent;
                        memcpy(&arg->chkinfo, _chkinfo, CHKINFO_SIZE(_chkinfo->repnum));
                        INIT_LIST_HEAD(&arg->list);

                        ret = squeue_insert(&worker->queue, &_chkinfo->id, arg, 0);
                        if (unlikely(ret))
                                GOTO(err_free, ret);

                        wait_arg.task = schedule_task_get();
                        list_add(&wait_arg.hook, &arg->list);

                        YASSERT(variable_thread() >= gloconf.main_loop_threads);
                        schedule_task_new("volume_load", __volume_load_bh, arg, -1);
                        //worker_post(&worker->sem);
                } else
                        GOTO(err_lock, ret);
        } else {
                wait_arg.task = schedule_task_get();
                list_add(&wait_arg.hook, &arg->list);
        }

        sy_spin_unlock(&worker->lock);

        ANALYSIS_BEGIN(0);

        ret = schedule_yield("volume_loading", NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 100, id2str(&_chkinfo->id));

        return 0;
err_free:
        yfree((void **)&arg);
err_lock:
        sy_spin_unlock(&worker->lock);
err_ret:
        return ret;
}

STATIC int __volume_ctl_try_load(const chkid_t *chkid, const fileid_t *_parent)
{
        int ret, retry = 0;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX], pool[MAX_NAME_LEN];
        poolid_t parent;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __VOLUME_CHUNK__);

        DBUG("try load volume %s\n", id2str(chkid));

retry:
        ret = replica_srv_getparent(chkid, &parent, pool);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(pool, &parent, chkid, chkinfo, NULL);
        if (unlikely(ret)) {
                DBUG("load "CHKID_FORMAT" parent "CHKID_FORMAT", ret: %d\n",
                     CHKID_ARG(chkid), CHKID_ARG(&parent), ret);

                if (ret == ENOENT && _parent && chkid_cmp(&parent, _parent)) {
                        YASSERT(retry == 0);

                        DWARN("try load "CHKID_FORMAT" from "CHKID_FORMAT"\n",
                              CHKID_ARG(chkid), CHKID_ARG(_parent));

                        ret = md_chunk_getinfo1(pool, _parent, chkid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = replica_srv_setparent(chkid, _parent);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry++;
                        goto retry;
                }

                GOTO(err_ret, ret);
        }

        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        ret = __volume_load_wait(pool, &parent, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        CHKINFO_DUMP(chkinfo, D_INFO);

        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);

        return 0;
err_ret:
        ANALYSIS_ASSERT(0, 1000 * 1000 * (_get_rpc_timeout() * 3), NULL);
        return ret;
}

STATIC int __volume_ctl_iops_bucket_update(volume_proto_t *volume_proto, const char *key, const char *buf, uint32_t buflen)
{
        int ret;
        token_bucket_t *bucket;

        (void) buflen;

        if (strcmp(key, THROT_IOPS) == 0) {
                bucket = &volume_proto->iops_bucket;

                ret = throt_qos_set(bucket, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                DINFO(""CHKID_FORMAT" %s update: valid:%d throt: %f max: %f\n",
                      CHKID_ARG(&volume_proto->chkid),
                      THROT_IOPS,
                      bucket->inited,
                      bucket->rate,
                      bucket->burst_max);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_bw_bucket_update(volume_proto_t *volume_proto, const char *key, const char *buf, uint32_t buflen)
{
        int ret;
        token_bucket_t *bw_bucket;

        (void) buflen;

        if (strcmp(key, THROT_BW) == 0) {
                bw_bucket = &volume_proto->bw_bucket;

                ret = throt_bw_set(bw_bucket, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                DINFO(""CHKID_FORMAT" %s update: valid:%d throt: %f max: %f\n",
                      CHKID_ARG(&volume_proto->chkid),
                      THROT_BW,
                      bw_bucket->inited,
                      bw_bucket->rate,
                      bw_bucket->burst_max);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_latency_update(volume_proto_t *volume_proto, const char *key, const char *buf, uint32_t buflen)
{
        int times = 0;
        latency_t *latency;

        (void) buflen;
        latency = &volume_proto->latency;

        if (strcmp(key, ANALYSIS_LATENCY) == 0) {
                times = atoi(buf);
        } else {
                goto out;
        }

        if (times > 0 && times < USEC_PER_SEC) {
                latency->valid = USEC_PER_SEC/times;
        } else if (times >= USEC_PER_SEC) {
                latency->valid = 1;
        } else {
                latency->valid = 0;
        }

        DINFO(""CHKID_FORMAT" update latency:%d\n",
                        CHKID_ARG(&volume_proto->chkid), latency->valid);

out:
        return 0;
}

STATIC int __volume_ctl_xattr_set(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *key = va_arg(ap, char *);
        const char *value = va_arg(ap, char *);
        uint32_t valuelen = va_arg(ap, uint32_t);
        int flag = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->xattr_set(volume_proto, key, value, valuelen, flag);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = __volume_ctl_iops_bucket_update(volume_proto, key, value, valuelen);
        if (ret)
                GOTO(err_lock, ret);

        ret = __volume_ctl_bw_bucket_update(volume_proto, key, value, valuelen);
        if (ret)
                GOTO(err_lock, ret);

        ret = __volume_ctl_latency_update(volume_proto, key, value, valuelen);
        if (ret)
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_xattr_set(const volid_t *volid, const char *key, const char *value,
                     uint32_t valuelen, int flag)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "xattr_set", __volume_ctl_xattr_set,
                           volid, key, value, valuelen, flag);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_xattr_get(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *key = va_arg(ap, char *);
        char *value = va_arg(ap, char *);
        int *valuelen = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->xattr_get(volume_proto, key, value, valuelen);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}



int volume_ctl_xattr_get(const volid_t *volid, const char *key, char *value, int *valuelen)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "xattr_get", __volume_ctl_xattr_get,
                           volid, key, value, valuelen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int __volume_ctl_xattr_list(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        char *buf = va_arg(ap, char *);
        int *buflen = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->xattr_list(volume_proto, buf, buflen);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_xattr_list(const volid_t *volid, char *buf, int *buflen)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "xattr_list", __volume_ctl_xattr_list,
                           volid, buf, buflen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_xattr_remove(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *key = va_arg(ap, char *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->xattr_remove(volume_proto, key);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = __volume_ctl_iops_bucket_update(volume_proto, key, "0", 1);
        if (ret)
                GOTO(err_lock, ret);

        ret = __volume_ctl_bw_bucket_update(volume_proto, key, "0", 1);
        if (ret)
                GOTO(err_lock, ret);

        ret = __volume_ctl_latency_update(volume_proto, key, "0", 1);
        if (ret)
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_xattr_remove(const volid_t *volid, const char *key)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "xattr_remove", __volume_ctl_xattr_remove,
                           volid, key);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_move_self(mcache_entry_t *cent, const chkid_t *chkid,
                                        const nid_t *dist, int dist_count)
{
        int ret, deleting;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        // 如果移动的是volume的主副本， 则卷会在其他节点上加载，
        // 这时当前节点就需要主动释放内存并解除租约
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_move(volume_proto, chkid, dist, dist_count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (is_volume(chkid)) {
                DINFO("volume controller moved, free lease\n");
                lease_free(&volume_proto->lease);
                mcache_drop_nolock(cent);
        }

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_move__(mcache_entry_t *cent, const chkid_t *chkid,
                                     const nid_t *dist, int dist_count)
{
        int ret, deleting;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        // 如果移动的是volume的主副本， 则卷会在其他节点上加载，
        // 这时当前节点就需要主动释放内存并解除租约
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_move(volume_proto, chkid, dist, dist_count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_move(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const nid_t *dist = va_arg(ap, nid_t *);
        int dist_count = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 移动卷控制器，加写锁
        if (is_volume(chkid)) {
                ret = __volume_ctl_chunk_move_self(cent, chkid, dist, dist_count);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        } else {
                ret = __volume_ctl_chunk_move__(cent, chkid, dist, dist_count);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_move(const volid_t *volid, const chkid_t *chkid, const nid_t *dist, int dist_count)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_move", __volume_ctl_chunk_move,
                           volid, chkid, dist, dist_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_ctl_get(const volid_t *volid, mcache_entry_t **cent)
{
        return __volume_ctl_get(cent, volid);
}

void volume_ctl_release(mcache_entry_t *cent)
{
        __volume_ctl_release(cent);
}

int volume_ctl_move(const volid_t *volid, const nid_t *nid, int count)
{
        int ret;

        ret = volume_ctl_chunk_move(volid, volid, nid, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_localize(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        chkid_t chkid;
        const volid_t *volid = va_arg(ap, volid_t *);
        int idx = va_arg(ap, int);

        va_end(ap);

        fid2cid(&chkid, volid, idx);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_localize(volume_proto, &chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_localize(const volid_t *volid, int idx)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "localize", __volume_ctl_localize,
                           volid, idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_stat(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        filestat_t *filestat = va_arg(ap, filestat_t *);
        off_t off = va_arg(ap, off_t);
        size_t size = va_arg(ap, size_t);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->stat(volume_proto, filestat, off, size);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_stat(const volid_t *volid, filestat_t *filestat, off_t off, size_t size)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "stat", __volume_ctl_stat,
                           volid, filestat, off, size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#ifdef USE_ROW2
/**
 * @brief 检查卷是否加载成功
 * @note 不应加锁，也不能加载卷
 *
 * @param ap
 * @return
 */
STATIC int __volume_ctl_check_ready(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);

        va_end(ap);

        // TODO
        ret = __volume_ctl_get____(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto_check_ready(volume_proto);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_check_ready(const volid_t *volid)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "check_ready", __volume_ctl_check_ready, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#endif

STATIC int __volume_ctl_check_exists(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        int *exists = va_arg(ap, int *);
        va_end(ap);

        int ret;
        mcache_entry_t *cent;

        *exists = 0;

        // 只检查存在性，不做什么额外操作，即不尝试加载卷
        ret = __volume_ctl_get____(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *exists = 1;

        __volume_ctl_release(cent);

        return 0;
err_ret:
        return ret;
}

int volume_ctl_check_exists(const volid_t *volid, int *exists)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "check_exists", __volume_ctl_check_exists, volid, exists);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_create(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        int p = va_arg(ap, int);
        const char *_site = va_arg(ap, char *);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_create(volume_proto, name, p, _site, 0);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_create(const volid_t *volid, const char *name, int p, const char *_site)
{
        int ret;

        if (strchr(name, '/')) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        ret = core_request(core_hash(volid), -1, "snapshot_create", __volume_ctl_snapshot_create,
                           volid, name, p, _site);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_listopen(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *uuid = va_arg(ap, char *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_listopen(volume_proto, uuid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_listopen(const volid_t *volid, const char *uuid)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_list", __volume_ctl_snapshot_listopen,
                           volid, uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_list(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *uuid = va_arg(ap, const char *);
        uint64_t offset = va_arg(ap, uint64_t);
        void *de = va_arg(ap, void *);
        int *delen = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_list(volume_proto, uuid, offset, de, delen);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_list(const chkid_t *volid, const char *uuid, uint64_t offset, void *de, int *delen)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_list", __volume_ctl_snapshot_list,
                           volid, uuid, offset, de, delen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_listclose(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *uuid = va_arg(ap, char *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_listclose(volume_proto, uuid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_listclose(const volid_t *volid, const char *uuid)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_list", __volume_ctl_snapshot_listclose,
                           volid, uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_last(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        nid_t *snapnid = va_arg(ap, nid_t *);
        volid_t *snapid = va_arg(ap, volid_t *);
        char *snapname = va_arg(ap, char *);
        uint64_t *snap_version = va_arg(ap, uint64_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_last(volume_proto, snapnid, snapid,
                                          snapname, snap_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_last(const volid_t *volid, nid_t *snapnid,
                             volid_t *snapid, char *snapname, uint64_t *snap_version)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_last", __volume_ctl_snapshot_last,
                           volid, snapnid, snapid, snapname, snap_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_exist(const volid_t *volid)
{
        int ret;
        poolid_t poolid;

        ret = replica_srv_getparent(volid, &poolid, NULL);
        if (ret == 0)
                return 1;
        else
                return 0;
}

STATIC int __volume_ctl_chunk_iterator1__(mcache_entry_t *cent, func2_t func2, void *arg, uint32_t *chknum)
{
        int ret;
        volume_proto_t *volume_proto;
        table2_t *table2;
        table1_t *table1;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        table1 = &volume_proto->table1;
        table2 = &volume_proto->table2;
        *chknum = table2->chknum;

        YASSERT(is_volume(&volume_proto->chkid));

        // L1
        func2(arg, &volume_proto->chkid, table1->table_proto->chkinfo);

        int i;
        for (i = 0; i < table1->ext_info.chunk_count; i++) {
                if (!table1->ext[i])
                        continue;

                func2(arg, &volume_proto->chkid, table1->ext[i]->chkinfo);
        }

        // L2
        ret = volume_proto->chunk_iterator1(volume_proto, func2, arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_iterator2__(mcache_entry_t *cent, func2_t func2, void *arg, uint64_t idx)
{
        int ret;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_iterator2(volume_proto, func2, arg, idx);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_lock;
                else
                        GOTO(err_lock, ret);
        }

        ANALYSIS_END(0, IO_WARN, "__volume_ctl_chunk_iterator2__");

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_unintact1__(mcache_entry_t *cent, func3_t func3, void *arg, uint32_t *chknum)
{
        int ret;
        volume_proto_t *volume_proto;
        table2_t *table2;
        table1_t *table1;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        table1 = &volume_proto->table1;
        table2 = &volume_proto->table2;
        *chknum = table2->chknum;

        YASSERT(is_volume(&volume_proto->chkid));

        // L1
        func3(arg, &volume_proto->chkid, table1->table_proto->chkinfo, NULL);

        int i;
        for (i = 0; i < table1->ext_info.chunk_count; i++) {
                func3(arg, &volume_proto->chkid, table1->ext[i]->chkinfo, NULL);
        }

        // L2
        ret = volume_proto->chunk_unintact1(volume_proto, func3, arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_unintact2__(mcache_entry_t *cent, func3_t func3, void *arg, uint32_t idx, int deep)
{
        int ret;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->chunk_unintact2(volume_proto, func3, arg, idx, deep);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_lock;
                else
                        GOTO(err_lock, ret);
        }

        ANALYSIS_END(0, IO_WARN, "__volume_ctl_chunk_unintact2__");

        __volume_ctl_unlock(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_iterator1(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        func2_t func2 = va_arg(ap, func2_t);
        void *arg = va_arg(ap, void *);
        uint32_t *chknum = va_arg(ap, uint32_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // vol + subvol
        ret = __volume_ctl_chunk_iterator1__(cent, func2, arg, chknum);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_iterator2(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        func2_t func2 = va_arg(ap, func2_t);
        void *arg = va_arg(ap, void *);
        int i = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // raw
        ret = __volume_ctl_chunk_iterator2__(cent, func2, arg, i);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_release;
                else
                        GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_unintact1(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        func3_t func3 = va_arg(ap, func3_t);
        void *arg = va_arg(ap, void *);
        uint32_t *chknum = va_arg(ap, uint32_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // vol + subvol
        ret = __volume_ctl_chunk_unintact1__(cent, func3, arg, chknum);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_iterator(const volid_t *volid, func2_t func2, void *arg)
{
        int ret;
        uint32_t i, chknum;

        ret = core_request(core_hash(volid), -1, "volume_ctl_chunk_iterator1",
                        __volume_ctl_chunk_iterator1, volid, func2, arg, &chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        generator_t gen;
        chkid_generator_init(&gen, chknum, FILE_PROTO_EXTERN_ITEM_COUNT);

        while(!chkid_generator(&gen, &i)) {
                ret = core_request(core_hash(volid), -1, "volume_ctl_chunk_iterator2",
                                   __volume_ctl_chunk_iterator2, volid, func2, arg, i);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int volume_ctl_offline_check(const volid_t *volid)
{
        int ret, count, i, offline_count;
        nid_t array[512], *nid, offline_node[VFM_COUNT_MAX];

        ret = conn_listnode(array, &count);
        if (unlikely(ret))
                GOTO(err_ret, ret);


        offline_count = 0;
        for (i = 0; i < count; i++) {
                nid = &array[i];
                
                if (!conn_online(nid, -1)) {
                        offline_node[offline_count] = *nid;
                        offline_count++;
                }
        }

        if (offline_count) {
                DINFO("vfm set "CHKID_FORMAT" offline count %u\n",
                      CHKID_ARG(volid), offline_count);

                ret = volume_ctl_vfm_add(volid, offline_node, offline_count);
                if (unlikely(ret)) {
                        if (ret == EBUSY) {
                                DERROR("vfm set "CHKID_FORMAT" offline count %u ret %d\n",
                                       CHKID_ARG(volid), offline_count, ret);
                                goto out;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                DINFO("vfm set "CHKID_FORMAT" offline count %u done\n",
                      CHKID_ARG(volid), offline_count);
        }

out:
        return 0;
err_ret:
        return ret;
}

#if ENABLE_RECOVERY_DEEP

STATIC int __volume_ctl_chunk_unintact2(const volid_t *volid, func3_t func3,
                                        void *arg, uint32_t i, uint32_t deep)
{
        int ret;
        mcache_entry_t *cent;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // raw
        ret = __volume_ctl_chunk_unintact2__(cent, func3, arg, i, deep);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_release;
                else
                        GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

static int __volume_ctl_chunk_unintact_seg__(const volid_t *volid, func3_t func3,
                                           void *arg, uint32_t deep, uint32_t from, uint32_t to)
{
        int ret, retry = 0;
        uint32_t i;
        chkid_t chkid;
        uint32_t left = to - from;
        uint32_t off = 0;
        uint32_t step, count = 0;
        generator_t gen;

        DINFO("scan "CHKID_FORMAT"[%u, %u) begin, deep %u\n", CHKID_ARG(volid), from, to, deep);
        
        YASSERT(to > from);
        
        while (left > 0) {
                step = _min(RECOVERY_SUBVOL_GROUP_SIZE, left);

                chkid_generator_init(&gen, step, FILE_PROTO_EXTERN_ITEM_COUNT);

                while (!chkid_generator(&gen, &i)) {
                        if (count % 10000 == 0) {
                                DINFO("scan "CHKID_FORMAT" [%u, %u) %d/%d\n", CHKID_ARG(volid), from, to, count, to - from);
                        }

                        DBUG("scan "CHKID_FORMAT" [%u, %u) %d/%d\n", CHKID_ARG(volid), from, to, count, to - from);
                        
                        count++;

retry:
                        ret = __volume_ctl_chunk_unintact2(volid, func3, arg, off + from + i, deep);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        continue;
                                } else {
                                        ret = _errno(ret);
                                        if (ret == EAGAIN)  {
                                                USLEEP_RETRY(err_fail, ret, retry, retry, 10, (100 * 1000));
                                        } else {
                                                GOTO(err_ret, ret);
                                        }
                                }
                        }
                }

                off += step;
                left -= step;
        }

        DINFO("scan "CHKID_FORMAT"[%u, %u) success\n", CHKID_ARG(volid), from, to);
        
        return 0;
err_fail:
        fid2cid(&chkid, volid, i);
        DWARN("scan "CHKID_FORMAT" fail, ret %u\n",
              CHKID_ARG(&chkid), ret);
err_ret:
        return ret;
}

#define SCAN_THREAD 10

typedef struct {
        volid_t volid;
        func3_t func3;
        void *arg;
        uint32_t deep;
        uint32_t from;
        uint32_t to;
        int retval;
        sem_t sem;
} args_t;

static int __volume_ctl_chunk_unintact_seg(va_list ap)
{
        args_t *args = va_arg(ap, args_t *);

        va_end(ap);

        return __volume_ctl_chunk_unintact_seg__(&args->volid, args->func3,
                                        args->arg, args->deep, args->from, args->to);
        
}

static void *__volume_ctl_chunk_unintact__(void *_args)
{
        int ret;
        args_t *args = _args;

        ret = core_request(core_hash(&args->volid), SCHEDULE_PRIORITY1,
                           "__volume_ctl_chunk_unintact__",
                           __volume_ctl_chunk_unintact_seg, args);

        args->retval = ret;

        sem_post(&args->sem);
        
        pthread_exit(NULL);
}

static int __volume_ctl_chunk_unintact(const volid_t *volid, func3_t func3, void *arg, int deep, uint32_t chknum)
{
        int ret, i, err = 0;
        args_t *array, *args;
        uint32_t left, step, thread;

#if 1
        if (chknum < 1024) {
                thread = 1;
        } else {
                thread = 5;
        }
#else
        
        if (chknum < 1024) {
                thread = 1;
        } else if (chknum < 1024 * 1024) {
                thread = SCAN_THREAD;
        } else {
                thread = SCAN_THREAD * 5;
        }
#endif

        ret = ymalloc((void**)&array, sizeof(*array) * thread);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        step = _ceil(chknum, thread);
        left = chknum;
        for (i = 0; i < thread; i++) {
                args = &array[i];
                args->volid = *volid;
                args->func3 = func3;
                args->arg = arg;
                args->deep = deep;
                args->from = step * i;
                args->to = _min(step * (i + 1),  args->from + left);
                args->retval = 0;
                left -= step;

                YASSERT(args->to > args->from);
                YASSERT(left >= 0);

                ret = sem_init(&args->sem, 0, 0);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                ret = sy_thread_create2(__volume_ctl_chunk_unintact__, args, "__volume_ctl_chunk_unintact");
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < thread; i++) {
                args = &array[i];
                ret = sem_wait(&args->sem);
                if (unlikely(ret)) {
                        UNIMPLEMENTED(__DUMP__);
                }

                if (args->retval) {
                        err++;
                }
        }        

        yfree((void **)&array);
        
        if (err) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int volume_ctl_chunk_unintact(const volid_t *volid, func3_t func3, void *arg, int deep)
{
        int ret;
        uint32_t chknum;
        fileinfo_t fileinfo;
        int online, total;

        DINFO("scan "CHKID_FORMAT" begin\n", CHKID_ARG(volid));

        ret = volume_ctl_getattr(volid, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = conn_faultdomain(&total, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (online < fileinfo.repnum_usr) {
                DWARN("scan "CHKID_FORMAT" online %u need %u\n", CHKID_ARG(volid),
                      online, fileinfo.repnum_usr);
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        ret = volume_ctl_offline_check(volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);
        ret = core_request(core_hash(volid), SCHEDULE_PRIORITY1,
                           "volume_ctl_chunk_unintact1",
                           __volume_ctl_chunk_unintact1, volid, func3, arg, &chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_chunk_unintact(volid, func3, arg, deep, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ANALYSIS_END(0, 1000 * 1000, id2str(volid));
 
        DINFO("scan "CHKID_FORMAT" end\n", CHKID_ARG(volid));
       
        return 0;
err_ret:
        ANALYSIS_END(0, 1000 * 1000, id2str(volid));
        return ret;
}

#else

STATIC int __volume_ctl_chunk_unintact2(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        func3_t func3 = va_arg(ap, func3_t);
        void *arg = va_arg(ap, void *);
        uint32_t i = va_arg(ap, uint32_t);
        uint32_t deep = va_arg(ap, uint32_t);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // raw
        ret = __volume_ctl_chunk_unintact2__(cent, func3, arg, i, deep);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_release;
                else
                        GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_unintact(const volid_t *volid, func3_t func3, void *arg, int deep)
{
        int ret, retry = 0;
        uint32_t i, chknum, count = 0;
        fileinfo_t fileinfo;
        int online, total;
        chkid_t chkid;

        (void) deep;
        
        DINFO("scan "CHKID_FORMAT" begin\n", CHKID_ARG(volid));

        ret = volume_ctl_getattr(volid, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = conn_faultdomain(&total, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (online < fileinfo.repnum_usr) {
                DWARN("scan "CHKID_FORMAT" online %u need %u\n", CHKID_ARG(volid),
                      online, fileinfo.repnum_usr);
                ret = ECANCELED;
                GOTO(err_ret, ret);
        }

        ret = volume_ctl_offline_check(volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ANALYSIS_BEGIN(0);
        ret = core_request(core_hash(volid), SCHEDULE_PRIORITY1,
                           "volume_ctl_chunk_unintact1",
                           __volume_ctl_chunk_unintact1, volid, func3, arg, &chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        uint32_t left = chknum;
        uint32_t off = 0;
        uint32_t step;

        generator_t gen;

        while (left > 0) {
                step = _min(RECOVERY_SUBVOL_GROUP_SIZE, left);

                chkid_generator_init(&gen, step, FILE_PROTO_EXTERN_ITEM_COUNT);

                while (!chkid_generator(&gen, &i)) {
                        if (count % 500000 == 0) {
                                DINFO("scan "CHKID_FORMAT" %d/%d\n", CHKID_ARG(volid), count, chknum);
                        }

                        count++;

retry:
                        ret = core_request(core_hash(volid), -1, "volume_ctl_chunk_unintact2",
                                           __volume_ctl_chunk_unintact2, volid, func3, arg, off + i, deep);
                        if (unlikely(ret)) {
                                if (ret == ENOENT) {
                                        continue;
                                } else {
                                        ret = _errno(ret);
                                        if (ret == EAGAIN)  {
                                                USLEEP_RETRY(err_fail, ret, retry, retry, 10, (100 * 1000));
                                        } else {
                                                GOTO(err_ret, ret);
                                        }
                                }
                        }
                }

                off += step;
                left -= step;
        }

        ANALYSIS_END(0, 1000 * 1000, id2str(volid));

        DINFO("scan "CHKID_FORMAT" end\n", CHKID_ARG(volid));

        return 0;
err_fail:
        fid2cid(&chkid, volid, i);
        DWARN("scan "CHKID_FORMAT" fail, ret %u\n",
              CHKID_ARG(&chkid), ret);
err_ret:
        ANALYSIS_END(0, 1000 * 1000, id2str(volid));
        return ret;
}

#endif

#define CHUNK_ITERATOR_STEP 1000
STATIC int __volume_ctl_chunk_iterator2_with_cursor(va_list ap);

int volume_ctl_chunk_iterator_with_cursor(const volid_t *volid, func2_t func2, void *arg)
{
        int ret;
        uint64_t volsize, left, step, cursor, offset;

        ret = core_request(core_hash(volid), -1, "volume_ctl_chunk_iterator1",
                        __volume_ctl_chunk_iterator1, volid, func2, arg, &volsize);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        cursor = 0;
        offset = 0;
        left = volsize;
        while (left > 0) {
                step = _min(left, CHUNK_ITERATOR_STEP);

                cursor = offset;
                ret = core_request(core_hash(volid), -1, "chunk_iterator2_with_cursor",
                                __volume_ctl_chunk_iterator2_with_cursor, volid, volsize, step, 0, &cursor, func2, arg);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                /* nothing to do */
                        } else
                                GOTO(err_ret, ret);
                }
                offset = offset + step;
                left -= step;
        }

        return 0;
err_ret:
        return ret;

}

STATIC int __volume_ctl_chunk_num_iterator1(mcache_entry_t *cent, func2_t func2, void *arg, uint64_t *chknum)
{
        int ret;
        volume_proto_t *volume_proto;
        table2_t *table2;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        table2 = &volume_proto->table2;
        *chknum = table2->chknum;

        YASSERT(is_volume(&volume_proto->chkid));

        func2(arg, &volume_proto->chkid, &table2->chknum);

        __volume_ctl_unlock(cent);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_num(va_list ap)
{
        int ret;
        uint64_t chknum;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        func2_t func2 = va_arg(ap, func2_t);
        void *arg = va_arg(ap, void *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // vol + subvol
        ret = __volume_ctl_chunk_num_iterator1(cent, func2, arg, &chknum);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_num(const volid_t *volid, func2_t func2, void *arg)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_num", __volume_ctl_chunk_num,
                           volid, func2, arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_iterator2_with_cursor(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        uint64_t volsize = va_arg(ap, uint64_t);
        uint64_t step = va_arg(ap, uint64_t);
        int find = va_arg(ap, int);
        uint64_t *cursor = va_arg(ap, uint64_t *);
        func2_t func2 = va_arg(ap, func2_t);
        void *arg = va_arg(ap, void *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        uint64_t left = step;

        while(left && (*cursor) < volsize) {
                ret = __volume_ctl_chunk_iterator2__(cent, func2, arg, *cursor);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                if(!find) {
                                        left--;
                                }
                                (*cursor)++;
                                continue;
                        } else {
                                GOTO(err_release, ret);
                        }
                }

                (*cursor)++;
                left--;
        }

        __volume_ctl_release(cent);
        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

/*
 * 参数 find 置位代表直到找到step数量的chunk， 当卷是稀疏的
 */
int volume_ctl_chunk_iterator2_with_cursor(const volid_t *volid, uint64_t volsize, uint64_t step,
                uint64_t find, uint64_t *cursor, func2_t func2, void *arg)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_iterator2_with_cursor",
                        __volume_ctl_chunk_iterator2_with_cursor, volid, volsize, step, find, cursor, func2, arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;

}

STATIC int __volume_ctl_snapshot_lookup(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        chkinfo_t *chkinfo = va_arg(ap, chkinfo_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        if (volume_proto->chkid.type != __VOLUME_CHUNK__) {
                ret = EISDIR;
                GOTO(err_lock, ret);
        }

        ret = volume_proto->snapshot_lookup(volume_proto, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_lookup(const volid_t *volid, const char *name,
                             chkinfo_t *chkinfo)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_lookup", __volume_ctl_snapshot_lookup,
                           volid, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_read(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const io_t *io = va_arg(ap, io_t *);
        buffer_t *buf = va_arg(ap, buffer_t *);
#if 0
        const volid_t *snapid = va_arg(ap, volid_t *);
        size_t size = va_arg(ap, size_t);
        off_t offset = va_arg(ap, off_t);
#endif

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        ret = volume_proto->snapshot_read(volume_proto, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        ANALYSIS_END(0, IO_WARN, NULL);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

STATIC int __volume_ctl_snapshot_read_meta(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const io_t *io = va_arg(ap, io_t *);
        const char *snap = va_arg(ap, const char*);
        buffer_t *buf = va_arg(ap, buffer_t *);
        volume_proto_t *volume_proto;
#if 0
        const volid_t *snapid = va_arg(ap, volid_t *);
        size_t size = va_arg(ap, size_t);
        off_t offset = va_arg(ap, off_t);
#endif

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        ret = volume_proto->snapshot_read_meta(volume_proto, io, snap, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_lower_read(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        const volid_t *volid = va_arg(ap, volid_t *);
        const io_t *io = va_arg(ap, io_t *);
        buffer_t *buf = va_arg(ap, buffer_t *);
        volume_proto_t *volume_proto;
#if 0
        const volid_t *snapid = va_arg(ap, volid_t *);
        size_t size = va_arg(ap, size_t);
        off_t offset = va_arg(ap, off_t);
#endif

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        ret = volume_proto->lower_read(volume_proto, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_diff(const volid_t *parent, const volid_t *volid,
                const volid_t *snapdst, buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        mcache_entry_t *cent;

        ret = __volume_ctl_get(&cent, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = volume_proto_snapshot_diff(cent->value, volid, snapdst, buf, size, offset);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_read(const volid_t *volid, const io_t *io, buffer_t *buf)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_read", __volume_ctl_snapshot_read,
                           volid, io, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_ctl_lower_read(const volid_t *volid, const io_t *io, buffer_t *buf)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "lower_read", __volume_ctl_lower_read,
                           volid, io, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_ctl_snapshot_read_meta(const volid_t *volid, const io_t *io, const char *snap, buffer_t *buf)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_read_meta", __volume_ctl_snapshot_read_meta,
                           volid, io, snap, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_remove(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        int force = va_arg(ap, int);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        // checklist:
        // - 快照存在
        // - 快照不处于保护模式(client-side)
        // - 快照不处于rollback状态
        ret = volume_proto->snapshot_check(volume_proto, name);
        if (ret)
                GOTO(err_lock, ret);

        ret = volume_proto->snapshot_remove(volume_proto, name, force);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_remove(const volid_t *volid, const char *name, int force)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_remove", __volume_ctl_snapshot_remove,
                           volid, name, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_protect(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const snap_protect_param_t on = va_arg(ap, snap_protect_param_t);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_protect(volume_proto, on);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_protect(const volid_t *volid, const snap_protect_param_t on)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_protect", __volume_ctl_snapshot_protect,
                           volid, on);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_updateparent(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        const uint64_t from = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_updateparent(volume_proto, name, from);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_updateparent(const volid_t *volid, const char *name, const uint64_t from)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_updateparent", __volume_ctl_snapshot_updateparent,
                           volid, name, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_setfrom(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const uint64_t from = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_setfrom(volume_proto, from);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_setfrom(const volid_t *volid, const uint64_t from)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_setfrom", __volume_ctl_snapshot_setfrom,
                           volid, from);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_prev(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const fileid_t *snapid = va_arg(ap, fileid_t *);
        fileid_t *previd = va_arg(ap, fileid_t *);
        char *name = va_arg(ap, char *);
        uint64_t *snap_version = va_arg(ap, uint64_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_prev(volume_proto, snapid, previd, name, snap_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_prev(const volid_t *volid, const fileid_t *snapid, fileid_t *previd, char *name, uint64_t *snap_version)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_prev", __volume_ctl_snapshot_prev,
                           volid, snapid, previd, name, snap_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_unlock(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        uuid_t *uuid = va_arg(ap, uuid_t *);

        va_end(ap);
        YASSERT(!uuid_is_null(*uuid));

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;

        if (uuid_compare(volume_proto->snapshot_lock_uuid, *uuid)) {
                ret = ESTALE;
                GOTO(err_release, ret);
        }

        ret = mcache_tryrdlock(cent);
        YASSERT(ret != 0);

        uuid_clear(volume_proto->snapshot_lock_uuid);
        __volume_ctl_unlock(cent);

        __volume_ctl_release(cent);

        //dup release, get in __volume_ctl_snapshot_wrlock
        __volume_ctl_release(cent);

        DINFO("snapshot unlock "CHKID_FORMAT" unlocked\n", CHKID_ARG(volid));

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_unlock(const volid_t *volid, const uuid_t uuid)
{
        int ret;
        uuid_t uu;

        uuid_copy(uu, uuid);

        YASSERT(!uuid_is_null(uuid));
        ret = core_request(core_hash(volid), -1, "snapshot_unlock", __volume_ctl_snapshot_unlock,
                           volid, &uu);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        volid_t volid;
        uuid_t uuid;
} unlock_arg_t;

STATIC void __volume_ctl_snapshot_unlock__(void *arg)
{
        int ret;
        unlock_arg_t *uarg = (unlock_arg_t *)arg;


        schedule_sleep("snapshot_unlock_timer", 1000 * 1000 * 30);

        ret = volume_ctl_snapshot_unlock(&uarg->volid, uarg->uuid);
        if (ret != ESTALE) {
                if (!ret) {
                        DERROR("snapshot unlocked here, pre op failed\n");
                } else {
                        DERROR("snapshot unlock err (%d)\n", ret);
                }
        }

        yfree((void **)&arg);
}

STATIC int __volume_ctl_snapshot_unlock_timer(const volid_t *volid, uuid_t uuid)
{
        int ret;
        unlock_arg_t *arg;

        ret = ymalloc((void **)&arg, sizeof(unlock_arg_t));
        if (ret)
                GOTO(err_ret, ret);

        arg->volid = *volid;
        uuid_copy(arg->uuid, uuid);

        schedule_task_new("snapshot_unlock_timer", __volume_ctl_snapshot_unlock__, arg, -1);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_wrlock(va_list ap)
{
        int ret, retry = 0;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        uuid_t *_uuid = va_arg(ap, uuid_t *);

        va_end(ap);

        ANALYSIS_BEGIN(0);

retry:
        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(1);

        ret = __volume_ctl_wrlock_prio(cent);
        if (unlikely(ret)) {
                if (ret == ESTALE) {
                        __volume_ctl_release(cent);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (100 * 1000));
                } else
                        GOTO(err_release, ret);
        }

        volume_proto = cent->value;
        if (!uuid_is_null(volume_proto->snapshot_lock_uuid)) {
                ret = EBUSY;
                GOTO(err_lock, ret);
        }

        uuid_generate(*_uuid);
        uuid_copy(volume_proto->snapshot_lock_uuid, *_uuid);

        ANALYSIS_END(1, IO_WARN, NULL);

        ret =  __volume_ctl_snapshot_unlock_timer(volid, *_uuid);
        if (ret)
                GOTO(err_clear, ret);

        //release when unlock;
        //__volume_ctl_release(cent);

        ANALYSIS_END(0, IO_WARN, NULL);

        DINFO("snapshot lock "CHKID_FORMAT" locked\n", CHKID_ARG(volid));

        return 0;
err_clear:
        uuid_clear(volume_proto->snapshot_lock_uuid);
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_wrlock(const volid_t *volid, uuid_t *_uuid)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_wrlock", __volume_ctl_snapshot_wrlock,
                           volid, _uuid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_create_nolock(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        const char *_site = va_arg(ap, char *);
        uuid_t *uuid = va_arg(ap, uuid_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        YASSERT(!uuid_is_null(*uuid));
        if (uuid_compare(volume_proto->snapshot_lock_uuid, *uuid)) {
                ret = ESTALE;
                GOTO(err_release, ret);
        }

        ret = mcache_tryrdlock(cent);
        YASSERT(ret != 0);

        ret = volume_proto->snapshot_create(volume_proto, name, FALSE, _site, 0);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        DINFO("snapshot create "CHKID_FORMAT"@%s\n", CHKID_ARG(volid), name);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_create_nolock(const volid_t *volid, const char *name,
                                      const char *_site, const uuid_t uuid)
{
        int ret;
        uuid_t uu;
        uuid_copy(uu, uuid);

        YASSERT(!uuid_is_null(uuid));
        ret = core_request(core_hash(volid), -1, "snapshot_create_nolock",
                           __volume_ctl_snapshot_create_nolock, volid, name, _site, &uu);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_remove_nolock(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        uuid_t *uuid = va_arg(ap, uuid_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        YASSERT(!uuid_is_null(*uuid));

        if (uuid_compare(volume_proto->snapshot_lock_uuid, *uuid)) {
                ret = ESTALE;
                GOTO(err_release, ret);
        }

        ret = mcache_tryrdlock(cent);
        YASSERT(ret != 0);

        ret = volume_proto->snapshot_check(volume_proto, name);
        if (ret)
                GOTO(err_release, ret);

        ret = volume_proto->snapshot_remove(volume_proto, name, 0);
        if (unlikely(ret))
                GOTO(err_release, ret);

        __volume_ctl_release(cent);

        DINFO("snapshot remove "CHKID_FORMAT" snapshot (%s) \n", CHKID_ARG(volid), name);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_remove_nolock(const volid_t *volid, const char *name, const uuid_t uuid)
{
        int ret;
        uuid_t uu;
        uuid_copy(uu, uuid);

        YASSERT(!uuid_is_null(uuid));
        ret = core_request(core_hash(volid), -1, "snapshot_remove_nolock",
                           __volume_ctl_snapshot_remove_nolock, volid, name, &uu);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_check(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->snapshot_check(volume_proto, name);
        if (ret)
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_check(const volid_t *volid, const char *name)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_check",
                           __volume_ctl_snapshot_check, volid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_isempty(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        int *empty = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->snapshot_isempty(volume_proto, empty);
        if (ret)
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_isempty(const volid_t *volid, int *empty)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_isempty",
                           __volume_ctl_snapshot_isempty, volid, empty);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_ctl_init(uint64_t max_chunk)
{
        int ret;

        ret = mcache_init(&__vc_cache__, max_chunk, __cmp, __hash, __core_hash,
                          __drop, 1, "volume_ctl");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_loader_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        ret = __volume_ctl_recycle_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_connect(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        const char *addr = va_arg(ap, char *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->iscsiConnect(volume_proto, nid, addr);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_connect(const volid_t *volid, const nid_t *nid, const char *addr)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_connect", __volume_ctl_connect,
                        volid, nid, addr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_disconnect(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const nid_t *peer = va_arg(ap, nid_t *);
        const char *addr = va_arg(ap, char *);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->iscsiDisconnect(volume_proto, peer, addr);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_disconnect(const volid_t *volid, const nid_t *peer, const char *addr)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_disconnect", __volume_ctl_disconnect,
                        volid, peer, addr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_connection(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        void *list = va_arg(ap, void *);
        int *count = va_arg(ap, int *);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->iscsiConnection(volume_proto, nid, list, count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_connection(const volid_t *volid, const nid_t *nid, void *list, int *count)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "vctl_connection", __volume_ctl_connection,
                        volid, nid, list, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_vfm_cleanup(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *tid = va_arg(ap, chkid_t *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->vfm_cleanup(volume_proto, tid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_vfm_cleanup(const volid_t *volid, const chkid_t *tid)
{
        int ret;

        DBUG("vfm cleanup "CHKID_FORMAT"\n", CHKID_ARG(volid));
        ret = core_request(core_hash(volid), -1, "vctl_vfm_cleanup", __volume_ctl_vfm_cleanup,
                           volid, tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}


STATIC int __volume_ctl_vfm_add(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        const volid_t *volid = va_arg(ap, volid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        int count = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->vfm_add(volume_proto, nid, count);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_vfm_add(const volid_t *volid, const nid_t *nid, int count)
{
        int ret;

        DBUG("vfm add "CHKID_FORMAT"\n", CHKID_ARG(volid));
        ret = core_request(core_hash(volid), SCHEDULE_PRIORITY1,
                           "vctl_vfm_add", __volume_ctl_vfm_add,
                           volid, nid, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __do_volume_proto_destroy(va_list ap) {
        const volid_t *volid = va_arg(ap, volid_t *);
        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        DWARN("vol %s\n", id2str(volid));

        // @malloc err_release
        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // @malloc err_lock
        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(is_volume(&volume_proto->chkid));

        volume_proto_destroy(volume_proto);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
//err_lock:
//        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC void __volume_proto_destroy(void *arg, void *ent)
{
        int ret;
        mcache_entry_t *cent = ent;
        volume_proto_t *volume_proto = cent->value;

        (void)arg;

        ret = core_request(core_hash(&volume_proto->chkid), -1, "volume_proto_destroy",
                           __do_volume_proto_destroy, &volume_proto->chkid);
        if (unlikely(ret)) {
                DWARN("ret %d\n", ret);
        }
}

/**
 * 关闭集群时触发，是flush各个卷的最后时机
 *
 * @return
 */
int volume_ctl_destroy_all()
{
        mcache_iterator(__vc_cache__, __volume_proto_destroy, NULL);
        return 0;
}

STATIC void __volume_proto_dump_memory(void *arg, void *ent)
{
        uint64_t memory = 0;
        mcache_entry_t *cent = ent;
        volume_proto_t *volume_proto = cent->value;
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;

        uint64_t *total = arg;

        memory = sizeof(volume_proto) +
                 table1->table_count * BYTES_PER_MB +
                 table2->chknum * (sizeof(chkinfo_t) + sizeof(chkstat_t));

        DINFO("chkid %s memory %.3f MB\n", id2str(&volume_proto->chkid), 1.0 * memory/BYTES_PER_MB);

        *total += memory;
}

/**
 * 获取内存使用量
 *
 * @return
 */
int volume_ctl_dump_memory(uint64_t *memory)
{
        *memory = 0;

        if (__vc_cache__)
                mcache_iterator(__vc_cache__, __volume_proto_dump_memory, memory);
        else {
                DERROR("volume controller not inited\n");
        }
        return 0;
}
